-
Notifications
You must be signed in to change notification settings - Fork 45
Add savepoints for parquet migration #261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Add skipParquetFiles field to track already-processed Parquet files during sequential migrations with savepoints. This allows the migrator to resume from where it left off by skipping files that have already been successfully migrated. Also add getSkipParquetFilesOrEmptySet() helper method for convenient access with a sensible default.
Add StringSetAccumulator, a Spark accumulator for efficiently tracking a set of strings across distributed operations. This will be used to track which Parquet files have been processed during migration. The accumulator provides: - Thread-safe add() operation for marking items as processed - Set-based storage to avoid duplicates - Proper Spark accumulator semantics for distributed execution Include comprehensive unit tests covering initialization, adding elements, merging accumulators, and handling duplicates.
Introduce ParquetProcessingStrategy trait to enable different migration strategies for Parquet sources. This applies the Strategy pattern to allow switching between parallel (default) and sequential (with savepoints) processing modes. The interface defines a single migrate() method that implementations will use to execute their specific migration logic.
Implement the parallel processing strategy that reads all Parquet files at once using Spark's native parquet reader. This is the traditional approach that maximizes parallelism and performance. Key characteristics: - Reads all files in one operation - No savepoint support (savepointsSupported = false) - Best performance for migrations expected to complete uninterrupted - Default behavior matching existing migrations This strategy maintains backward compatibility with the original Parquet migration behavior.
Extract ScyllaMigratorBase trait to enable different savepoint strategies. This refactoring introduces extension points that allow external savepoints managers to be injected, enabling file-by-file Parquet migration with savepoints. Key changes: - Extract ScyllaMigratorBase trait with template method pattern - Add externalSavepointsManager hook for injecting custom managers - Add createSavepointsManager() for strategy-specific manager creation - Add shouldCloseManager() to control manager lifecycle - Refactor migrate() to use external or created manager - Update exception handling to support different accumulator types The existing ScyllaMigrator object maintains full backward compatibility by implementing the trait with original CQL savepoints behavior.
Introduce ScyllaParquetMigrator class to handle Parquet migrations with external savepoints management. This enables file-by-file processing where each file's completion is tracked. Key design decisions: - Accepts external ParquetSavepointsManager in constructor - Does not create its own savepoints manager (returns None) - Does not close the external manager (returns false from shouldCloseManager), allowing the caller to manage lifecycle This separation of concerns allows SequentialParquetStrategy to orchestrate multiple migrations while maintaining a single savepoints manager across all file operations.
Implement savepoints manager for tracking processed Parquet files. Uses StringSetAccumulator to maintain state across Spark operations and persists the set of processed files to configuration. Key features: - Tracks processed files via StringSetAccumulator - Implements SavepointsManager interface for consistency - Updates MigratorConfig with skipParquetFiles on state dump - Factory method initializes accumulator from existing skipParquetFiles This enables resumable Parquet migrations by remembering which files have been successfully processed.
Implement sequential Parquet processing strategy that reads and migrates files one at a time with savepoint support. Each file's successful migration is recorded before moving to the next. Key features: - Processes files sequentially using prepareParquetReader - Skips already-processed files from skipParquetFiles config - Uses ParquetSavepointsManager to track progress - Marks each file as processed after successful migration - Creates final savepoint when all files complete - Uses Using.resource for proper manager cleanup Trade-offs: - Lower parallelism than parallel mode (processes one file at a time) - Enables resumability for long-running migrations - Ideal for large datasets or unreliable environments
Implement utility methods for Parquet file discovery and AWS credential configuration to support both parallel and sequential processing modes. New functionality: - listParquetFiles(): Discovers all Parquet files at given path using Spark's recursive file lookup - prepareParquetReader(): Creates ParquetReaderWithSavepoints that filters out already-processed files - configureHadoopCredentials(): Sets up S3A credentials including support for session tokens and regional endpoints The configureHadoopCredentials method now includes comprehensive documentation explaining Hadoop's TemporaryAWSCredentialsProvider requirement and links to official documentation. These utilities enable the sequential strategy to enumerate files and track progress while maintaining credential compatibility.
Complete the Strategy pattern implementation by adding strategy selection logic based on parquetProcessingMode configuration. The migrateToScylla method now: - Reads parquetProcessingMode from config.savepoints - Instantiates ParallelParquetStrategy (default) or SequentialParquetStrategy based on mode - Delegates migration to the selected strategy - Logs the selected mode for observability This provides a clean separation between the two migration approaches while maintaining backward compatibility (parallel is default).
Add comprehensive unit tests for the Parquet savepoints feature: ParquetSavepointsTest: - Tests ParquetSavepointsManager state tracking - Verifies file marking and accumulator behavior - Tests config update with processed files - Ensures proper state serialization ParquetConfigSerializationTest: - Tests Circe encoding/decoding of ParquetProcessingMode - Verifies parallel and sequential mode serialization - Tests Savepoints config with parquetProcessingMode field - Validates backward compatibility with missing field - Tests error handling for invalid mode values These tests ensure configuration serialization and state management work correctly across migration restarts.
Add integration test suite for end-to-end Parquet migration validation: ParquetMigratorSuite: - Base test utilities for Parquet migration testing - Shared fixtures and helper methods - Test data generation and validation ParquetParallelModeTest: - Integration test for parallel processing mode - Verifies default behavior and performance characteristics ParquetSavepointsIntegrationTest: - Integration test for sequential mode with savepoints - Tests migration interruption and resumption - Verifies skipParquetFiles configuration handling - Validates that already-processed files are skipped - Ensures data consistency after resume These tests validate the complete migration flow including Spark DataFrame operations, ScyllaDB writes, and savepoint persistence.
Add YAML configuration files for testing both Parquet processing modes: parquet-to-scylla-parallel.yaml: - Configuration for parallel processing mode (default) - No parquetProcessingMode specified (uses default) - Validates backward compatibility parquet-to-scylla-savepoints.yaml: - Configuration for sequential processing with savepoints - Sets parquetProcessingMode: sequential - Example configuration for resumable migrations These configurations serve as both test fixtures and documentation examples for users implementing Parquet migrations.
Simplify ParquetToScyllaBasicMigrationTest to work with the new Strategy pattern architecture. The test now uses the standard Parquet migration path which automatically selects the appropriate strategy based on configuration. Changes: - Use Parquet.migrateToScylla instead of direct DataFrame creation - Remove manual DataFrame construction - Rely on parallel strategy (default) for basic migration test - Maintain same test coverage with cleaner implementation This refactoring validates that the new architecture maintains backward compatibility with existing migration workflows.
Introduce ParquetProcessingMode enum with two variants: - Parallel: Process all Parquet files at once (default, no savepoints) - Sequential: Process files one by one with savepoint support Add parquetProcessingMode field to Savepoints configuration with appropriate Circe encoders/decoders. The mode defaults to Parallel to maintain backward compatibility with existing configurations. This lays the foundation for implementing savepoint support for Parquet migrations, allowing users to resume interrupted migrations without reprocessing already-migrated files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds comprehensive savepoints support for Parquet-to-Scylla migrations, enabling resumable data migrations by tracking processed Parquet files at a granular level. The implementation handles both single-partition and multi-partition files, supports parallel processing, and provides detailed progress tracking through Spark listeners.
Key changes:
- Implemented file-level savepoints tracking with partition awareness to support resumable Parquet migrations
- Refactored
ScyllaMigratorinto a trait-based architecture for extensibility with specialized Parquet migration logic - Added
FileCompletionListenerto track Spark task completion and aggregate partition-level events into file-level completion
Reviewed Changes
Copilot reviewed 29 out of 29 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| ParquetToScyllaBasicMigrationTest.scala | Refactored to use new ParquetMigratorSuite base class with fixture-based resource management |
| ParquetSavepointsTest.scala | Unit tests for Parquet file listing, filtering, and savepoints manager state tracking |
| ParquetSavepointsIntegrationTest.scala | Integration test verifying savepoints include all processed files after migration |
| ParquetResumeIntegrationTest.scala | Tests for migration resumption after interruption and idempotency with pre-processed files |
| ParquetParallelModeTest.scala | Tests parallel Parquet file processing with savepoints in both simple and concurrent scenarios |
| ParquetMultiPartitionTest.scala | Tests handling of large Parquet files split across multiple Spark partitions |
| ParquetMigratorSuite.scala | New base test suite providing fixtures for tables, Parquet directories, and savepoints |
| PartitionMetadataReaderTest.scala | Tests for reading Spark partition metadata and building file-to-partition mappings |
| FileCompletionListenerTest.scala | Tests for Spark listener tracking partition completion and file-level aggregation |
| ParquetConfigSerializationTest.scala | Tests YAML serialization/deserialization of skipParquetFiles configuration |
| StringSetAccumulatorTest.scala | Tests thread-safe string set accumulator for tracking processed files |
| parquet-to-scylla-*.yaml | Configuration files for various test scenarios including savepoints, resume, and parallel modes |
| .gitignore | Updated to track spark-master directory for savepoints storage |
| ScyllaMigrator.scala | Refactored into trait-based architecture with specialized ScyllaParquetMigrator implementation |
| PartitionMetadataReader.scala | Reads Spark partition metadata to build file-partition mappings for tracking |
| ParquetSavepointsManager.scala | Manages Parquet savepoints state using string set accumulator for processed files |
| Parquet.scala | Refactored to implement parallel file processing with savepoints and file completion tracking |
| FileCompletionListener.scala | Spark listener aggregating partition completions into file-level tracking |
| MigratorConfig.scala | Added skipParquetFiles field and helper method for configuration |
| StringSetAccumulator.scala | Thread-safe accumulator for collecting processed file paths across Spark tasks |
| Migrator.scala | Updated Parquet-to-Scylla migration path to use new migrateToScylla method |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
migrator/src/main/scala/com/scylladb/migrator/readers/FileCompletionListener.scala
Show resolved
Hide resolved
migrator/src/main/scala/com/scylladb/migrator/readers/FileCompletionListener.scala
Show resolved
Hide resolved
migrator/src/main/scala/com/scylladb/migrator/readers/PartitionMetadataReader.scala
Outdated
Show resolved
Hide resolved
migrator/src/main/scala/com/scylladb/migrator/readers/FileCompletionListener.scala
Outdated
Show resolved
Hide resolved
migrator/src/main/scala/com/scylladb/migrator/readers/FileCompletionListener.scala
Show resolved
Hide resolved
7df7cdc to
076ccad
Compare
migrator/src/main/scala/com/scylladb/migrator/readers/PartitionMetadataReader.scala
Outdated
Show resolved
Hide resolved
| spark.sparkContext.addSparkListener(listener) | ||
|
|
||
| try { | ||
| val sourceDF = SourceDataFrame(df, None, savepointsSupported = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why false when you support savepoints now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs a better look,
since basically savepoint manager needs to dump his state to savepoint file periodically (and in case of failure of course)
and I am wondering if this periodic write to savepoint file will happen if this is false ?!?!?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(or the manager is self contained and will dump savepoints automatically once it's instantiated?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why false when you support savepoints now?
Basically, this false flag will not create savepoint manager for basic ScyllaMigrator, and will not be used at all in ParquetMigrator at all
it's necessary to not create additional SavepointManager since we have external one created in Parquet object
Actually, I see current architecture / naming is confusing.
(or the manager is self contained and will dump savepoints automatically once it's instantiated?)
Yeah, it is 😌
| } | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
below has nothing to do with parquet, but OK, I will just close one eye, but PR clean wise below just confuses people, so non related changes should be in separate PRs/commits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thank you
Looks like the leftover after the approach with both parallel / sequential migrations
| sourceDF: SourceDataFrame | ||
| )(implicit spark: SparkSession): Option[SavepointsManager] | ||
|
|
||
| protected def shouldCloseManager(manager: SavepointsManager): Boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this?
I mean manager will be closed anyways and should be closed anyways
that's why it periodically dumps its state and of course it should close cleanly when any failure happens
| savePointsManger.dumpMigrationState("final") | ||
| savePointsManger.close() | ||
| if (shouldCloseManager(savePointsManger)) { | ||
| savePointsManger.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what will happen with parquet manager ?
who will close it then? is this a leak? :-D ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, probably the problem here is over-unification of different approaches for savepoints:
- Common
ScyllaMigratorcreates CQLSavepointsManager on its own - Parquet File Migrator uses external SavepointsManager created in Parquet.scala ( because it should be used inside our custom Spark listener )
The idea is:
- Parquet object is responsible to open SavepointsManager => same object should be responsible to close it ( and it's done safely with Using statement )
However, it makes sense to reconsider an approach to avoid unnecessary mess 🤔
tarzanek
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please check my comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 29 out of 29 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
migrator/src/main/scala/com/scylladb/migrator/alternator/StringSetAccumulator.scala
Show resolved
Hide resolved
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
I have no other comment from my end, so let's merge |
tarzanek
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks ok, we still have a real life test in flight for this, so let's merge if no problems will be seen
| val df = if (skipFiles.isEmpty) { | ||
| spark.read.parquet(source.path) | ||
| } else { | ||
| spark.read.parquet(filesToProcess: _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shuffle files before passing them over?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or is spark shuffling this by itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark handles partition sizing on its own using the spark.sql.files.maxPartitionBytes, so
- small files will be combined into single partition
- big files will be split between several partitions
However, I can add scala.util.Random.shuffle to be more safe, overhead is negligible
| .rdd | ||
| .mapPartitionsWithIndex { (partitionId, iter) => | ||
| val files = iter.map(row => row.getString(0)).toSet | ||
| files.map(filename => (partitionId, filename)).iterator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this behave when you have lots of 100-200MB files and few files that are around 5G (how is splitting)
|
before merging we are checking the logic of partition to file mapping - per tests with a case with lots of small (~200MB) and few big (~5G) files (with large partitions!) we saw delays and problems - if the mapping isn't correct, job might not finish |
…la-migrator into parquet-savepoint-rebased
|
alternator test might be impacted due to 2025.4 released https://forum.scylladb.com/t/release-scylladb-2025-4-0/5222 |
or better do |
|
Note: https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution |
869809f to
324c0e7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 40 out of 41 changed files in this pull request and generated 7 comments.
Comments suppressed due to low confidence (1)
migrator/src/main/scala/com/scylladb/migrator/scylla/ScyllaMigrator.scala:95
- The error handling in the catch block swallows the exception after logging it. This means that even if the migration fails with an error, the method will not propagate the failure to the caller. The application will appear to complete successfully despite data not being fully migrated. The exception should be re-thrown after dumping the savepoint state to ensure proper error propagation and to prevent silent failures.
} catch {
case NonFatal(e) => // Catching everything on purpose to try and dump the accumulator state
log.error(
"Caught error while writing the DataFrame. Will create a savepoint before exiting",
e)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private def checkFileCompletion(filename: String): Unit = { | ||
| if (completedFiles.contains(filename)) { | ||
| return | ||
| } | ||
|
|
||
| fileToPartitions.get(filename) match { | ||
| case Some(allPartitions) => | ||
| val allComplete = allPartitions.forall(completedPartitions.contains) | ||
|
|
||
| if (allComplete) { | ||
| if (completedFiles.putIfAbsent(filename, true).isEmpty) { | ||
| savepointsManager.markFileAsProcessed(filename) | ||
|
|
||
| val progress = s"${completedFiles.size}/${fileToPartitions.size}" | ||
| log.info(s"File completed: $filename (progress: $progress)") | ||
| } |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition in concurrent file completion checking. Between the check at line 58 (if (completedFiles.contains(filename))) and the putIfAbsent at line 67, multiple threads could pass the first check and all attempt to mark the file as complete. While putIfAbsent prevents duplicate entries in the map, this could result in savepointsManager.markFileAsProcessed(filename) being called multiple times for the same file. Consider moving the completion check inside a synchronized block or using completedFiles.putIfAbsent earlier to gate the entire completion logic.
| Using.resource(ParquetSavepointsManager(config, spark.sparkContext)) { savepointsManager => | ||
| val listener = new FileCompletionListener( | ||
| partitionToFiles, | ||
| fileToPartitions, | ||
| savepointsManager | ||
| ) | ||
| spark.sparkContext.addSparkListener(listener) | ||
|
|
||
| try { | ||
| val sourceDF = SourceDataFrame(df, None, savepointsSupported = false) | ||
|
|
||
| log.info("Created DataFrame from Parquet source") | ||
|
|
||
| ScyllaParquetMigrator.migrate(config, target, sourceDF, savepointsManager) | ||
|
|
||
| savepointsManager.dumpMigrationState("completed") | ||
|
|
||
| log.info( | ||
| s"Parquet migration completed successfully: " + | ||
| s"${listener.getCompletedFilesCount}/${listener.getTotalFilesCount} files processed") | ||
|
|
||
| } finally { | ||
| spark.sparkContext.removeSparkListener(listener) | ||
| log.info(s"Final progress: ${listener.getProgressReport}") | ||
| } | ||
| } |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource management concern: The ParquetSavepointsManager is closed by the Using.resource block, but it's passed to ScyllaParquetMigrator.migrate which internally marks it with shouldCloseManager = false to prevent double-closing. However, this creates a brittle design where the lifecycle management is split between two places. If an exception occurs during migration (line 80), the savepointsManager.dumpMigrationState("completed") at line 82 will never be called, and only the "final" dump in ScyllaMigrator will execute. Consider consolidating the resource management logic or adding explicit error handling to ensure proper state dumping in all scenarios.
| val df = if (skipFiles.isEmpty) { | ||
| spark.read.parquet(source.path) | ||
| } else { | ||
| spark.read.parquet(filesToProcess: _*) | ||
| } |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential inconsistency in DataFrame creation. When skipFiles.isEmpty is true (line 52), the code reads from source.path which may include newly added files that weren't in the original allFiles list. However, when skipFiles is not empty, it reads from the specific filesToProcess list. This could lead to different behavior: in the first case, files added after the listParquetFiles call would be processed, while in the second case they would not. Consider always using the explicit file list for consistency.
| val df = if (skipFiles.isEmpty) { | |
| spark.read.parquet(source.path) | |
| } else { | |
| spark.read.parquet(filesToProcess: _*) | |
| } | |
| val df = spark.read.parquet(filesToProcess: _*) |
| import io.circe.Codec | ||
| import io.circe.generic.extras.Configuration | ||
| import io.circe.generic.extras.semiauto._ | ||
|
|
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing documentation for the new enableParquetFileTracking parameter. The Savepoints case class should include Scaladoc comments explaining what this parameter does, when it should be set to true vs false, and its default value. This is especially important since it controls a significant behavior change in how Parquet migrations work.
| /** | |
| * Configuration for periodic savepoints written during a migration run. | |
| * | |
| * @param intervalSeconds | |
| * How often, in seconds, a savepoint should be written. | |
| * @param path | |
| * Filesystem path (directory or prefix) where savepoint data will be stored. | |
| * @param enableParquetFileTracking | |
| * When `true` (the default), enables tracking of already-processed Parquet files | |
| * as part of the savepoint state. This prevents the same Parquet file from | |
| * being migrated more than once if the job is restarted or savepoints are | |
| * resumed. | |
| * | |
| * Set this to `false` to keep the legacy behavior where Parquet files are not | |
| * tracked in savepoints. Disabling tracking may be useful for backwards | |
| * compatibility with older savepoints or when file tracking is handled by an | |
| * external mechanism, but it means repeated runs may reprocess the same | |
| * Parquet files. | |
| */ |
| savepoints: Savepoints, | ||
| skipTokenRanges: Option[Set[(Token[_], Token[_])]], | ||
| skipSegments: Option[Set[Int]], | ||
| skipParquetFiles: Option[Set[String]], |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing documentation for the new skipParquetFiles configuration field. This field should have Scaladoc comments explaining its purpose, format (Set of file URIs), and how it's used in the Parquet migration process. Without documentation, users won't understand what format the file paths should be in or when this field is populated.
| val message = "Several FileSourceScanExec were found in plan" | ||
| logger.error(s"$message. Plan: ${plan.treeString}") | ||
| throw new IllegalArgumentException(message) | ||
| case list if list.isEmpty => | ||
| val message = "DataFrame is not based on file source (FileSourceScanExec not found in plan)" | ||
| logger.error(s"$message. Plan: ${plan.treeString}") | ||
| throw new IllegalArgumentException(message) |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent indentation detected. Lines 31-32 and 35-36 use tabs instead of spaces. The codebase should use consistent spacing (2 spaces based on other files in the project). Please replace tabs with spaces to maintain consistency.
| val message = "Several FileSourceScanExec were found in plan" | |
| logger.error(s"$message. Plan: ${plan.treeString}") | |
| throw new IllegalArgumentException(message) | |
| case list if list.isEmpty => | |
| val message = "DataFrame is not based on file source (FileSourceScanExec not found in plan)" | |
| logger.error(s"$message. Plan: ${plan.treeString}") | |
| throw new IllegalArgumentException(message) | |
| val message = "Several FileSourceScanExec were found in plan" | |
| logger.error(s"$message. Plan: ${plan.treeString}") | |
| throw new IllegalArgumentException(message) | |
| case list if list.isEmpty => | |
| val message = "DataFrame is not based on file source (FileSourceScanExec not found in plan)" | |
| logger.error(s"$message. Plan: ${plan.treeString}") | |
| throw new IllegalArgumentException(message) |
| package com.scylladb.migrator.alternator | ||
|
|
||
| import org.apache.spark.util.AccumulatorV2 | ||
| import java.util.concurrent.atomic.AtomicReference | ||
|
|
||
| /** | ||
| * Accumulator for tracking processed Parquet file paths during migration. | ||
| * | ||
| * This accumulator collects the set of Parquet file paths that have been processed | ||
| * as part of a migration job. It is useful for monitoring progress, avoiding duplicate | ||
| * processing, and debugging migration workflows. The accumulator is thread-safe and | ||
| * can be used in distributed Spark jobs. | ||
| * | ||
| * @param initialValue The initial set of processed file paths (usually empty). | ||
| */ | ||
| class StringSetAccumulator(initialValue: Set[String] = Set.empty) | ||
| extends AccumulatorV2[String, Set[String]] { | ||
|
|
||
| private val ref = new AtomicReference(initialValue) | ||
|
|
||
| // Note: isZero may be momentarily inconsistent in concurrent scenarios, | ||
| // as it reads the current value of the set without synchronization. | ||
| // This is eventually consistent and thread-safe, but may not reflect the most recent updates. | ||
| def isZero: Boolean = ref.get.isEmpty | ||
| def copy(): StringSetAccumulator = new StringSetAccumulator(ref.get) | ||
| def reset(): Unit = ref.set(Set.empty) | ||
| def add(v: String): Unit = ref.getAndUpdate(_ + v) | ||
|
|
||
| def merge(other: AccumulatorV2[String, Set[String]]): Unit = | ||
| ref.getAndUpdate(_ ++ other.value) | ||
|
|
||
| def value: Set[String] = ref.get | ||
| } | ||
|
|
||
| object StringSetAccumulator { | ||
| def apply(initialValue: Set[String] = Set.empty): StringSetAccumulator = | ||
| new StringSetAccumulator(initialValue) | ||
| } |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The StringSetAccumulator class is placed in the alternator package, but it's a general-purpose utility used for Parquet migration tracking. This creates an incorrect package coupling. Consider moving this class to a more appropriate location such as com.scylladb.migrator.util or com.scylladb.migrator.accumulator to better reflect its purpose and avoid confusion.
|
please add to respective section of |
No description provided.